[Apache Hudi]Bootsrap機能を使ってデータを登録する
データアナリティクス事業本部の森脇です。
AWSから、Apache Hudiに関するAmazon EMRのニュースがありました。
New features from Apache Hudi available in Amazon EMR
Apache Hudiのいくつかの新機能がAmazon EMRにて利用可能になったようです。
この機能のうちの「ブートストラップ機能」が便利そうだったので実際に試してみました。
ブートストラップ機能とは
バージョン0.6.0で追加された機能です。
リリースノートから引用:
Bootstrapping existing parquet datasets : Adds support for bootstrapping existing datasets into Hudi, via both Spark datasource writer and deltastreamer tool, with support for reading from Hive, SparkSQL, AWS Athena (prestoDB support coming soon). See RFC-15 for technical details. Note that this is an experimental feature, which will be improved upon further in the 0.6.x versions.
通常、Apache Hudiにデータを登録する際にはSparkのデータフレームを入力にします。
そのため、ファイルを入力にしたい場合一度データフレームに読み込む必要がありました。
また、登録後のparquetファイルはメタデータと実データが混在する状態になっており、入力で使ったファイルとは中身が異なります。
このブートストラップ機能を利用すると「実データ」と「メタデータ」を分離してファイル化することが可能になります。
つまり、元々存在しているデータファイルには変更を行わずにHudiを利用することが可能です。 (※データファイルのフォーマットはparquet形式である必要があります)
試してみる
前述のAWSのブログではAmazon EMRを利用していました。
Glueジョブでも利用可能か試す意味もかねて、今回はGlueジョブを使ってみます。
Glueジョブの設定は、以前のブログと同じようにしました。
データファイル構成
日付毎のファイルを3つ配置し、入力とします。
s3://apache-hudi-bootstrap-test-data/test-data/ 20210311-data.parquet 20210312-data.parquet 20210313-data.parquet
それぞれのファイルの中身は以下のようにしました。
20210311-data.parquet
date | id | data |
---|---|---|
2021/03/11 | 20210311-0001 | データ1 |
2021/03/11 | 20210311-0002 | データ2 |
2021/03/11 | 20210311-0003 | データ3 |
2021/03/11 | 20210311-0004 | データ4 |
2021/03/11 | 20210311-0005 | データ5 |
20210312-data.parquet
date | id | data |
---|---|---|
2021/03/12 | 20210312-0001 | データ6 |
2021/03/12 | 20210312-0002 | データ7 |
2021/03/12 | 20210312-0003 | データ8 |
2021/03/12 | 20210312-0004 | データ9 |
2021/03/12 | 20210312-0005 | データ10 |
20210313-data.parquet
date | id | data |
---|---|---|
2021/03/13 | 20210313-0001 | データ11 |
2021/03/13 | 20210313-0002 | データ12 |
2021/03/13 | 20210313-0003 | データ13 |
2021/03/13 | 20210313-0004 | データ14 |
2021/03/13 | 20210313-0005 | データ15 |
Glueジョブプログラム
前述のデータファイルが格納されたバケットを入力としてApache Hudiにブートストラップします。
ブートストラップしたデータを参照し、あわせてGlueデータカタログにも登録しています。
データ登録後に、
の2点を確認します。
import sys from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from pyspark.sql.session import SparkSession from pyspark.sql.types import StructType from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate() sc = spark.sparkContext glueContext = GlueContext(sc) job = Job(glueContext) job.init(args['JOB_NAME'], args) tableName = 'hudi_test_data_table' # テーブル名 inputPath = "s3://apache-hudi-bootstrap-test-data/test-data/" # parquetファイルが格納されたバケットパス hudiTablePath = f"s3://apache-hudi-bootstrap-test-meta/{tableName}/" # メタデータ(Hudiテーブル)の作成先。 # Hudiのオプション hudi_options = { 'hoodie.table.name': tableName, # テーブル名 # 書き込みオプション 'hoodie.datasource.write.recordkey.field': 'id', # レコードキーのカラム名 'hoodie.datasource.write.table.name': tableName, # テーブル名 'hoodie.datasource.write.operation': 'bootstrap', # ブートストラップにて書き込み # ブートストラップオプション 'hoodie.bootstrap.base.path': inputPath, # データファイルのパス 'hoodie.bootstrap.keygen.class': 'org.apache.hudi.keygen.SimpleKeyGenerator', # キージェネレーター # データカタログ連携オプション(hive_sync) 'hoodie.datasource.hive_sync.enable': 'true', # 連携を有効にする 'hoodie.datasource.hive_sync.database': 'default', # 連携先のデータベース名 'hoodie.datasource.hive_sync.table': tableName, # 連携先のテーブル名 'hoodie.datasource.hive_sync.use_jdbc': 'false' # jdbcを利用すると接続エラーになったのでfalseにする。 } # データの書き込み # 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する schema = StructType([]) emptyDataFrame = spark.createDataFrame(sc.emptyRDD(), schema) emptyDataFrame.write.format("hudi"). \ options(**hudi_options). \ mode("overwrite"). \ save(hudiTablePath) # 書き込んだ結果を参照 # 書き込んだ結果をSpark SQLでクエリ sfDF = spark. \ read. \ format("hudi"). \ load(hudiTablePath) sfDF.createOrReplaceTempView("sf_table") spark.sql("select * from sf_table").show() job.commit()
実行し、完了を待ちます。
完了後、それぞれのリソースがどのようになったか確認します。
データファイルS3バケット:
$ aws --profile csa s3 ls --recursive s3://apache-hudi-bootstrap-test-data/ 2021-03-11 20:00:02 0 test-data/ 2021-03-11 20:00:13 2834 test-data/20210311-data.parquet 2021-03-11 20:00:14 2838 test-data/20210312-data.parquet 2021-03-11 20:00:15 2842 test-data/20210313-data.parquet
余計なファイルは作成されておらず、アップロード時のまま変わっていません!
メタデータS3バケット:
$ aws --profile csa s3 ls s3://apache-hudi-bootstrap-test-meta/hudi_test_data_table/ PRE .hoodie/ 2021-03-11 20:38:28 0 .hoodie_$folder$ 2021-03-11 20:38:45 93 .hoodie_partition_metadata 2021-03-11 20:38:46 433932 593ae849-63ad-47ab-a2da-0ddb39e6fe2a_1499-0-1499_00000000000001.parquet 2021-03-11 20:38:46 433910 db5993e1-1d27-406a-a746-2d22fb40f629_499-0-499_00000000000001.parquet 2021-03-11 20:38:46 433914 ef9be2f9-8a49-4175-91ee-c1ebdc48fd30_999-0-999_00000000000001.parquet
いくつかのparquetファイルが作成されています。
S3 Selectを利用して、ファイルの中身を確認してみます。
レコードキーとして指定した「id」カラム以外は実データの情報は存在せず、メタデータのみがファイル化されていますね。
Hudiテーブルから読み込んだデータフレームの表示結果をCloudwatch logsで確認してみます。
メタデータ,実データも全てクエリできています!
Glueデータカタログはどうでしょうか。
通常のテーブル作成時と同じように、メタカラムと実データカラムが含まれたテーブルが作成されました!
いい感じですね。
最後に、このテーブルに対してAthenaでクエリをかけてみます。
・・・メタデータは取得できましたが、実データのカラムはすべて値が空になってしまいました。。。
0.6.0時点ではまだ対応されていないのかもしれませんね。
引き続き検証していきたいと思います。
まとめ
Apache Hudiのブートストラップ機能をGlueジョブで試してみました。
実データ環境を汚すことなくApache Hudiを利用可能ですので、すでに存在するデータセットでApache Hudiを使いたい場合のハードルが下がりました。
一方で、ブートストラップで作成したテーブルはAthenaでクエリを正しく行えませんでした。
設定が悪いだけかもしれませんが、もし対応されていないのであれば今後の対応が待ち遠しいです!
最新バージョンの0.7.0ではどうなのかも今後試していこうと思います。
参照
※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。